{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "env: PYICEBERG_MAX_WORKERS=300\n"
     ]
    }
   ],
   "source": [
    "%env PYICEBERG_MAX_WORKERS=300"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "#!pip install \"pyiceberg[s3fs]\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyiceberg.catalog.rest import RestCatalog"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "catalog = RestCatalog(\"public\", ** {\n",
    "    \"uri\": f\"http://localhost:8181\",\n",
    "})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "catalog.create_namespace(\"public\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyiceberg.schema import Schema\n",
    "from pyiceberg.types import (\n",
    "    NestedField,\n",
    "    LongType,\n",
    "    TimestampType,\n",
    "    DoubleType,\n",
    "    StringType,\n",
    ")\n",
    "\n",
    "schema = Schema(\n",
    "    NestedField(field_id=1, name=\"VendorID\", field_type=LongType(), required=False),\n",
    "    NestedField(field_id=2, name=\"tpep_pickup_datetime\", field_type=TimestampType(), required=False),\n",
    "    NestedField(field_id=3, name=\"tpep_dropoff_datetime\", field_type=TimestampType(), required=False),\n",
    "    NestedField(field_id=4, name=\"passenger_count\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=5, name=\"trip_distance\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=6, name=\"RatecodeID\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=7, name=\"store_and_fwd_flag\", field_type=StringType(), required=False),\n",
    "    NestedField(field_id=8, name=\"PULocationID\", field_type=LongType(), required=False),\n",
    "    NestedField(field_id=9, name=\"DOLocationID\", field_type=LongType(), required=False),\n",
    "    NestedField(field_id=10, name=\"payment_type\", field_type=LongType(), required=False),\n",
    "    NestedField(field_id=11, name=\"fare_amount\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=12, name=\"extra\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=13, name=\"mta_tax\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=14, name=\"tip_amount\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=15, name=\"tolls_amount\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=16, name=\"improvement_surcharge\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=17, name=\"total_amount\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=18, name=\"congestion_surcharge\", field_type=DoubleType(), required=False),\n",
    "    NestedField(field_id=19, name=\"airport_fee\", field_type=DoubleType(), required=False),\n",
    ")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyiceberg.partitioning import PartitionSpec, PartitionField\n",
    "from pyiceberg.transforms import DayTransform, MonthTransform, YearTransform, BucketTransform\n",
    "\n",
    "partition_spec = PartitionSpec(\n",
    "    PartitionField(source_id=2, field_id=1001, transform=MonthTransform(), name=\"tpep_pickup_datetime_month\"),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyiceberg.table.sorting import SortOrder, SortField\n",
    "from pyiceberg.transforms import IdentityTransform\n",
    "\n",
    "sort_order = SortOrder(\n",
    "    SortField(source_id=4, transform=IdentityTransform())\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "table = catalog.create_table(\n",
    "    identifier=\"public.nyc_taxi\",\n",
    "    schema=schema,\n",
    "    partition_spec=partition_spec,\n",
    "    sort_order=sort_order,\n",
    "    properties={\n",
    "        \"write.format.default\": \"parquet\",\n",
    "        \"write.parquet.compression-codec\": \"zstd\",\n",
    "        \"write.target-file-size-bytes\": \"536870912\",\n",
    "        \"s3.connect-timeout\": \"10000\"\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Appending files: 100%|██████████| 26/26 [11:30<00:00, 26.56s/it, Appended 2022-12]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total rows in the table: 73531304\n"
     ]
    }
   ],
   "source": [
    "import requests\n",
    "import io\n",
    "import pyarrow.parquet as pq\n",
    "from tqdm import tqdm\n",
    "from datetime import datetime, timedelta\n",
    "\n",
    "# Base URL for the Parquet files\n",
    "base_url = \"https://pub-f6a668561f5e4bd6ac651efd8c18998d.r2.dev/nyc_taxi/yellow_tripdata_{}.parquet\"\n",
    "\n",
    "# Generate a list of dates from 2020-11 to 2022-12\n",
    "start_date = datetime(2020, 11, 1)\n",
    "end_date = datetime(2022, 12, 1)\n",
    "date_list = []\n",
    "\n",
    "current_date = start_date\n",
    "while current_date <= end_date:\n",
    "    date_list.append(current_date.strftime(\"%Y-%m\"))\n",
    "    current_date += timedelta(days=32)\n",
    "    current_date = current_date.replace(day=1)\n",
    "\n",
    "# Create a progress bar\n",
    "with tqdm(total=len(date_list), desc=\"Appending files\") as pbar:\n",
    "    for date_str in date_list:\n",
    "        file_url = base_url.format(date_str)\n",
    "        \n",
    "        # Download the file content\n",
    "        file_response = requests.get(file_url)\n",
    "        if file_response.status_code != 200:\n",
    "            print(f\"Failed to download {date_str}: {file_response.status_code}\")\n",
    "            continue\n",
    "        \n",
    "        # Read the Parquet file from the response content\n",
    "        file_content = io.BytesIO(file_response.content)\n",
    "        df = pq.read_table(file_content)\n",
    "        \n",
    "        # Append to the Iceberg table\n",
    "        table.append(df)\n",
    "        \n",
    "        pbar.update(1)\n",
    "        pbar.set_postfix_str(f\"Appended {date_str}\")\n",
    "\n",
    "# Print the total number of rows in the table after appending all files\n",
    "print(f\"Total rows in the table: {len(table.scan().to_arrow())}\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
